using System;
using System.Collections.Generic;
using System.Threading.Tasks;
using EventStore.Core.Data;
using EventStore.Core.Messages;
using EventStore.Core.Messaging;
using EventStore.Client.Streams;
using Grpc.Core;
using Empty = EventStore.Client.Empty;

namespace EventStore.Core.Services.Transport.Grpc {
	internal partial class Streams<TStreamId> {
		public override async Task<AppendResp> Append(
			IAsyncStreamReader<AppendReq> requestStream,
			ServerCallContext context) {

			using var duration = _appendTracker.Start();
			try {
				if (!await requestStream.MoveNext().ConfigureAwait(false))
					throw new InvalidOperationException();

				if (requestStream.Current.ContentCase != AppendReq.ContentOneofCase.Options)
					throw new InvalidOperationException();

				var options = requestStream.Current.Options;
				var streamName = options.StreamIdentifier;

				var expectedVersion = options.ExpectedStreamRevisionCase switch {
					AppendReq.Types.Options.ExpectedStreamRevisionOneofCase.Revision => new StreamRevision(
						options.Revision).ToInt64(),
					AppendReq.Types.Options.ExpectedStreamRevisionOneofCase.Any => AnyStreamRevision.Any.ToInt64(),
					AppendReq.Types.Options.ExpectedStreamRevisionOneofCase.StreamExists => AnyStreamRevision.StreamExists
						.ToInt64(),
					AppendReq.Types.Options.ExpectedStreamRevisionOneofCase.NoStream =>
						AnyStreamRevision.NoStream.ToInt64(),
					_ => throw RpcExceptions.InvalidArgument(options.ExpectedStreamRevisionCase)
				};

				var requiresLeader = GetRequiresLeader(context.RequestHeaders);

				var user = context.GetHttpContext().User;
				var op = WriteOperation.WithParameter(
					Plugins.Authorization.Operations.Streams.Parameters.StreamId(streamName));
				if (!await _provider.CheckAccessAsync(user, op, context.CancellationToken).ConfigureAwait(false)) {
					throw RpcExceptions.AccessDenied();
				}

				var correlationId = Guid.NewGuid(); // TODO: JPB use request id?

				var events = new List<Event>();

				var size = 0;
				while (await requestStream.MoveNext().ConfigureAwait(false)) {
					if (requestStream.Current.ContentCase != AppendReq.ContentOneofCase.ProposedMessage)
						throw new InvalidOperationException();

					var proposedMessage = requestStream.Current.ProposedMessage;
					var data = proposedMessage.Data.ToByteArray();
					var metadata = proposedMessage.CustomMetadata.ToByteArray();

					if (!proposedMessage.Metadata.TryGetValue(Constants.Metadata.Type, out var eventType)) {
						throw RpcExceptions.RequiredMetadataPropertyMissing(Constants.Metadata.Type);
					}

					if (!proposedMessage.Metadata.TryGetValue(Constants.Metadata.ContentType, out var contentType)) {
						throw RpcExceptions.RequiredMetadataPropertyMissing(Constants.Metadata.ContentType);
					}

					size += Event.SizeOnDisk(eventType, data, metadata);

					if (size > _maxAppendSize) {
						throw RpcExceptions.MaxAppendSizeExceeded(_maxAppendSize);
					}

					events.Add(new Event(
						Uuid.FromDto(proposedMessage.Id).ToGuid(),
						eventType,
						contentType == Constants.Metadata.ContentTypes.ApplicationJson,
						data,
						metadata));
				}

				var appendResponseSource = new TaskCompletionSource<AppendResp>(TaskCreationOptions.RunContinuationsAsynchronously);

				var envelope = new CallbackEnvelope(HandleWriteEventsCompleted);

				_publisher.Publish(new ClientMessage.WriteEvents(
					correlationId,
					correlationId,
					envelope,
					requiresLeader,
					streamName,
					expectedVersion,
					events.ToArray(),
					user,
					cancellationToken: context.CancellationToken));

				return await appendResponseSource.Task.ConfigureAwait(false);

				void HandleWriteEventsCompleted(Message message) {
					if (message is ClientMessage.NotHandled notHandled &&
						RpcExceptions.TryHandleNotHandled(notHandled, out var ex)) {
						appendResponseSource.TrySetException(ex);
						return;
					}

					if (!(message is ClientMessage.WriteEventsCompleted completed)) {
						appendResponseSource.TrySetException(
							RpcExceptions.UnknownMessage<ClientMessage.WriteEventsCompleted>(message));
						return;
					}

					var response = new AppendResp();
					switch (completed.Result) {
						case OperationResult.Success:
							response.Success = new AppendResp.Types.Success();
							if (completed.LastEventNumber == -1) {
								response.Success.NoStream = new Empty();
							} else {
								response.Success.CurrentRevision = StreamRevision.FromInt64(completed.LastEventNumber);
							}

							if (completed.CommitPosition == -1) {
								response.Success.NoPosition = new Empty();
							} else {
								var position = Position.FromInt64(completed.CommitPosition, completed.PreparePosition);
								response.Success.Position = new AppendResp.Types.Position {
									CommitPosition = position.CommitPosition,
									PreparePosition = position.PreparePosition
								};
							}

							appendResponseSource.TrySetResult(response);
							return;
						case OperationResult.PrepareTimeout:
						case OperationResult.CommitTimeout:
						case OperationResult.ForwardTimeout:
							appendResponseSource.TrySetException(RpcExceptions.Timeout(completed.Message));
							return;
						case OperationResult.WrongExpectedVersion:
							response.WrongExpectedVersion = new AppendResp.Types.WrongExpectedVersion();

							switch (options.ExpectedStreamRevisionCase) {
								case AppendReq.Types.Options.ExpectedStreamRevisionOneofCase.Any:
									response.WrongExpectedVersion.ExpectedAny = new Empty();
									response.WrongExpectedVersion.Any2060 = new Empty();
									break;
								case AppendReq.Types.Options.ExpectedStreamRevisionOneofCase.StreamExists:
									response.WrongExpectedVersion.ExpectedStreamExists = new Empty();
									response.WrongExpectedVersion.StreamExists2060 = new Empty();
									break;
								case AppendReq.Types.Options.ExpectedStreamRevisionOneofCase.NoStream:
									response.WrongExpectedVersion.ExpectedNoStream = new Empty();
									response.WrongExpectedVersion.ExpectedRevision2060 = ulong.MaxValue;
									break;
								case AppendReq.Types.Options.ExpectedStreamRevisionOneofCase.Revision:
									response.WrongExpectedVersion.ExpectedRevision =
										StreamRevision.FromInt64(expectedVersion);
									response.WrongExpectedVersion.ExpectedRevision2060 =
										StreamRevision.FromInt64(expectedVersion);
									break;
							}

							if (completed.CurrentVersion == -1) {
								response.WrongExpectedVersion.CurrentNoStream = new Empty();
								response.WrongExpectedVersion.NoStream2060 = new Empty();
							} else {
								response.WrongExpectedVersion.CurrentRevision =
									StreamRevision.FromInt64(completed.CurrentVersion);
								response.WrongExpectedVersion.CurrentRevision2060 =
									StreamRevision.FromInt64(completed.CurrentVersion);
							}

							appendResponseSource.TrySetResult(response);
							return;
						case OperationResult.StreamDeleted:
							appendResponseSource.TrySetException(RpcExceptions.StreamDeleted(streamName));
							return;
						case OperationResult.InvalidTransaction:
							appendResponseSource.TrySetException(RpcExceptions.InvalidTransaction());
							return;
						case OperationResult.AccessDenied:
							appendResponseSource.TrySetException(RpcExceptions.AccessDenied());
							return;
						default:
							appendResponseSource.TrySetException(RpcExceptions.UnknownError(completed.Result));
							return;
					}
				}
			} catch (Exception ex) {
				duration.SetException(ex);
				throw;
			}
		}
	}
}
